/*
 * Copyright (c) Meta Platforms, Inc. and affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <chrono>
#include <folly/CppAttributes.h>
#include <folly/GLog.h>
#include <folly/Optional.h>
#include <folly/io/async/WriteFlags.h>

namespace folly {

class AsyncSocketException;
class EventBase;
class AsyncTransport;
class AsyncSocket;

/**
 * Observer of socket events.
 */
class AsyncSocketObserverInterface {
 public:
  enum class Events {};
  AsyncSocketObserverInterface() = default;
  virtual ~AsyncSocketObserverInterface() = default;

  /**
   * Information provided to observer during prewrite event.
   *
   * Based on this information, an observer can build a PrewriteRequest.
   */
  struct PrewriteState {
    // raw byte stream offsets
    size_t startOffset{0};
    size_t endOffset{0};

    // flags already set
    WriteFlags writeFlags{WriteFlags::NONE};

    // timestamp recorded at the AsyncSocket layer
    //
    // supports sequencing of PrewriteState events and ByteEvents for debug
    std::chrono::steady_clock::time_point ts = {
        std::chrono::steady_clock::now()};
  };

  /**
   * Request that can be generated by observer in response to prewrite event.
   *
   * An observer can use a PrewriteRequest to request WriteFlags to be added
   * to a write and/or to request that the write be split up, both of which
   * can be used for timestamping.
   */
  struct PrewriteRequest {
    // offset to split write at; may be split at earlier offset by another req
    folly::Optional<size_t> maybeOffsetToSplitWrite;

    // write flags to be added if write split at requested offset
    WriteFlags writeFlagsToAddAtOffset{WriteFlags::NONE};

    // write flags to be added regardless of where write happens
    WriteFlags writeFlagsToAdd{WriteFlags::NONE};
  };

  /**
   * Container of PrewriteRequests passed on invocation of prewrite().
   *
   * If an observer wants to make a PrewriteRequest, it adds its request to this
   * container. Each added request is merged into a PrewriteRequest that can be
   * fetched via getMergedRequest().
   */
  class PrewriteRequestContainer {
   public:
    explicit PrewriteRequestContainer(
        const AsyncSocketObserverInterface::PrewriteState& prewriteState)
        : prewriteState_(prewriteState) {}

    /**
     * Add a PrewriteRequest to the container.
     */
    void addRequest(
        const AsyncSocketObserverInterface::PrewriteRequest& request) {
      mergedRequest_.writeFlagsToAdd |= request.writeFlagsToAdd;
      if (request.maybeOffsetToSplitWrite.has_value()) {
        CHECK_GE(
            prewriteState_.endOffset, request.maybeOffsetToSplitWrite.value());
        if (
            // case 1: offset not set in merged request
            !mergedRequest_.maybeOffsetToSplitWrite.has_value() ||
            // case 2: offset in merged request > offset in current request
            mergedRequest_.maybeOffsetToSplitWrite >
                request.maybeOffsetToSplitWrite) {
          mergedRequest_.maybeOffsetToSplitWrite =
              request.maybeOffsetToSplitWrite; // update
          mergedRequest_.writeFlagsToAddAtOffset =
              request.writeFlagsToAddAtOffset; // reset
        } else if (
            // case 3: offset in merged request == offset in current request
            request.maybeOffsetToSplitWrite ==
            mergedRequest_.maybeOffsetToSplitWrite) {
          mergedRequest_.writeFlagsToAddAtOffset |=
              request.writeFlagsToAddAtOffset; // merge
        }
        // case 4: offset in merged request < offset in current request
        // (do nothing)
      }

      // if maybeOffsetToSplitWrite points to end of the vector, remove the
      // split
      if (mergedRequest_.maybeOffsetToSplitWrite.has_value() && // explicit
          mergedRequest_.maybeOffsetToSplitWrite == prewriteState_.endOffset) {
        mergedRequest_.maybeOffsetToSplitWrite.reset(); // no split needed
      }
    }

    /**
     * Returns the merged PrewriteREquest representing action to take.
     *
     * The merged request has the split point at the earliest offset requested
     * across all requests, and has flags set to be the union of all timestamp
     * flags requested across all requests.
     *
     * Examples:
     *
     * - If there are two PrewriteRequests, one requesting we split on byte
     *   offset 20, and the other requesting a split on byte offset 30, then we
     *   will split on offset 20, as this is the earlier of the offsets.
     *
     * - If there are two PrewriteRequests, one requesting that we add the TX
     *   and ACK flags, and the other requesting just the SCHED flag, then we
     *   will add the TX, ACK, and SCHED flags to the request.
     */
    FOLLY_NODISCARD const PrewriteRequest& getMergedRequest() const {
      return mergedRequest_;
    }

   private:
    const PrewriteState& prewriteState_;
    PrewriteRequest mergedRequest_;
  };

  /**
   * Structure used to communicate ByteEvents, such as TX and ACK timestamps.
   */
  struct ByteEvent {
    // types of events; start from 0 to enable indexing in arrays
    enum Type : uint8_t {
      WRITE = 0,
      SCHED = 1,
      TX = 2,
      ACK = 3,
    };
    // type
    Type type;

    // offset of corresponding byte in raw byte stream
    size_t offset{0};

    // socket timestamp, as recorded by AsyncSocket implementation
    std::chrono::steady_clock::time_point ts = {
        std::chrono::steady_clock::now()};

    // kernel software timestamp for non-WRITE; for Linux this is CLOCK_REALTIME
    // see https://www.kernel.org/doc/Documentation/networking/timestamping.txt
    folly::Optional<std::chrono::nanoseconds> maybeSoftwareTs;

    // hardware timestamp for non-WRITE events; see kernel documentation
    // see https://www.kernel.org/doc/Documentation/networking/timestamping.txt
    folly::Optional<std::chrono::nanoseconds> maybeHardwareTs;

    // for WRITE events, the number of raw bytes written to the socket
    // optional to prevent accidental misuse in other event types
    folly::Optional<size_t> maybeRawBytesWritten;

    // for WRITE events, the number of raw bytes we tried to write to the socket
    // optional to prevent accidental misuse in other event types
    folly::Optional<size_t> maybeRawBytesTriedToWrite;

    // for WRITE ByteEvents, additional WriteFlags passed
    // optional to prevent accidental misuse in other event types
    folly::Optional<WriteFlags> maybeWriteFlags;

    /**
     * For WRITE events, returns if SCHED timestamp requested.
     */
    FOLLY_NODISCARD bool schedTimestampRequestedOnWrite() const {
      CHECK_EQ(Type::WRITE, type);
      CHECK(maybeWriteFlags.has_value());
      return isSet(*maybeWriteFlags, WriteFlags::TIMESTAMP_SCHED);
    }

    /**
     * For WRITE events, returns if TX timestamp requested.
     */
    FOLLY_NODISCARD bool txTimestampRequestedOnWrite() const {
      CHECK_EQ(Type::WRITE, type);
      CHECK(maybeWriteFlags.has_value());
      return isSet(*maybeWriteFlags, WriteFlags::TIMESTAMP_TX);
    }

    /**
     * For WRITE events, returns if ACK timestamp requested.
     */
    FOLLY_NODISCARD bool ackTimestampRequestedOnWrite() const {
      CHECK_EQ(Type::WRITE, type);
      CHECK(maybeWriteFlags.has_value());
      return isSet(*maybeWriteFlags, WriteFlags::TIMESTAMP_ACK);
    }
  };

  /**
   * close() will be invoked when the socket is being closed.
   *
   * Can be called multiple times during shutdown / destruction for the same
   * socket. Observers may detach after first call or track if event
   * previously observed.
   *
   * @param socket   Socket being closed.
   */
  virtual void close(AsyncSocket* /* socket */) noexcept {}

  /**
   * connectAttempt() will be invoked when connect() is called.
   *
   * Triggered before any application connection callback.
   *
   * @param socket   Socket that attempts to connect.
   */
  virtual void connectAttempt(AsyncSocket* /* socket */) noexcept {}

  /**
   * connectSuccess() will be invoked when connect() returns successfully.
   *
   * Triggered before any application connection callback.
   *
   * @param socket   Socket that has connected.
   */
  virtual void connectSuccess(AsyncSocket* /* socket */) noexcept {}

  /**
   * connectError() will be invoked when connect() returns an error.
   *
   * Triggered before any application connection callback.
   *
   * @param socket      Socket that has connected.
   * @param ex          Exception that describes why.
   */
  virtual void connectError(
      AsyncSocket* /* socket */,
      const AsyncSocketException& /* ex */) noexcept {}

  /**
   * Invoked when the socket is being attached to an EventBase.
   *
   * Called from within the EventBase thread being attached.
   *
   * @param socket      Socket with EventBase change.
   * @param evb         The EventBase being attached.
   */
  virtual void evbAttach(AsyncSocket* /* socket */, EventBase* /* evb */) {}

  /**
   * Invoked when the socket is being detached from an EventBase.
   *
   * Called from within the EventBase thread being detached.
   *
   * @param socket      Socket with EventBase change.
   * @param evb         The EventBase that is being detached.
   */
  virtual void evbDetach(AsyncSocket* /* socket */, EventBase* /* evb */) {}

  /**
   * Invoked each time a ByteEvent is available.
   *
   * Multiple ByteEvent may be generated for the same byte offset and event.
   * For instance, kernel software and hardware TX timestamps for the same
   * are delivered in separate CMsg, and thus will result in separate
   * ByteEvent.
   *
   * @param socket      Socket that ByteEvent is available for.
   * @param event       ByteEvent (WRITE, SCHED, TX, ACK).
   */
  virtual void byteEvent(
      AsyncSocket* /* socket */, const ByteEvent& /* event */) noexcept {}

  /**
   * Invoked if ByteEvents are enabled.
   *
   * Only called if the observer's configuration requested ByteEvents. May
   * be invoked multiple times if ByteEvent configuration changes (i.e., if
   * ByteEvents are enabled without hardware timestamps, and then enabled
   * with them).
   *
   * @param socket    Socket that ByteEvents are enabled for.
   */
  virtual void byteEventsEnabled(AsyncSocket* /* socket */) noexcept {}

  /**
   * Invoked if ByteEvents could not be enabled, or if an error occurred that
   * will prevent further delivery of ByteEvents.
   *
   * An observer may be waiting to receive a ByteEvent, such as an ACK event
   * confirming delivery of the last byte of a payload, before closing the
   * socket. If the socket has become unhealthy then this ByteEvent may
   * never occur, yet the handler may be unaware that the socket is
   * unhealthy if reads have been shutdown and no writes are occurring; this
   * observer signal breaks this 'deadlock'.
   *
   * @param socket      Socket that ByteEvents are now unavailable for.
   * @param ex          Details on why ByteEvents are now unavailable.
   */
  virtual void byteEventsUnavailable(
      AsyncSocket* /* socket */,
      const AsyncSocketException& /* ex */) noexcept {}

  /**
   * Invoked before each write to the socket if prewrite support enabled.
   *
   * The observer receives information about the pending write in the
   * PrewriteState and can request ByteEvents / socket timestamps by returning
   * a PrewriteRequest. The request contains the offset to split the write at
   * (if any) and WriteFlags to apply.
   *
   * PrewriteRequests are aggregated across observers. The write buffer is
   * split at the lowest offset returned by all observers. Flags are applied
   * based on configuration within the PrewriteRequest. Requests are not
   * sticky and expire after each write.
   *
   * Fewer bytes may be written than indicated in the PrewriteState or in the
   * PrewriteRequest split if the underlying socket / kernel
   * blocks on write.
   *
   * @param socket      Socket that ByteEvents are now unavailable for.
   * @param state       Pending write start and end offsets and flags.
   * @param container   Container of PrewriteRequests that observer can add to.
   */
  virtual void prewrite(
      AsyncSocket* /* transport */,
      const PrewriteState& /* state */,
      PrewriteRequestContainer& /* container */) {
    folly::terminate_with<std::runtime_error>(
        "prewrite() called but not defined");
  }

  /**
   * fdDetach() is invoked if the socket file descriptor is detached.
   *
   * detachNetworkSocket() will be triggered when a new AsyncSocket is being
   * constructed from an old one. See the moved() event for details about
   * this special case.
   *
   * @param socket      Socket for which detachNetworkSocket was invoked.
   */
  virtual void fdDetach(AsyncSocket* /* socket */) noexcept {}

  /**
   * fdAttach() is invoked when the socket file descriptor is attached.
   *
   * @param socket      Socket for which handleNetworkSocketAttached was
   * invoked.
   */
  virtual void fdAttach(AsyncSocket* /* socket */) noexcept {}

  /**
   * move() will be invoked when a new AsyncSocket is being constructed via
   * constructor AsyncSocket(AsyncSocket* oldAsyncSocket) from an AsyncSocket
   * that has an observer attached.
   *
   * This type of construction is common during TLS/SSL accept process.
   * wangle::Acceptor may transform an AsyncSocket to an AsyncFizzServer, and
   * then transform the AsyncFizzServer to an AsyncSSLSocket on fallback.
   * AsyncFizzServer and AsyncSSLSocket derive from AsyncSocket and at each
   * stage the aforementioned constructor will be called.
   *
   * Observers may be attached when the initial AsyncSocket is created, before
   * TLS/SSL accept handling has completed. As a result, AsyncSocket must
   * notify the observer during each transformation so that:
   *   (1) The observer can track these transformations for debugging.
   *   (2) The observer does not become separated from the underlying
   *        operating system socket and corresponding file descriptor.
   *
   * When a new AsyncSocket is being constructed via the aforementioned
   * constructor, the following observer events will be triggered:
   *   (1) fdDetach
   *   (2) move
   *
   * When move is triggered, the observer can CHOOSE to detach the old socket
   * and attach to the new socket. This process will not happen automatically;
   * the observer must explicitly perform these steps.
   *
   * @param oldSocket   Old socket that fd was detached from.
   * @param newSocket   New socket being constructed with fd attached.
   */
  virtual void move(
      AsyncSocket* /* oldSocket */, AsyncSocket* /* newSocket */) noexcept {}
};

} // namespace folly
